Cluster消息类型定义
1 |
在Redis初始化服务initServer函数中,调用aeCreateTimeEvent注册了时间事件,周期性的执行serverCron函数,在serverCron中可以看到每隔100ms调用一次clusterCron函数,执行Redis Cluster定时任务:
1 | void initServer(void) { |
clusterCron
clusterCron是集群相关的定时执行函数,每100ms执行一次:
- 遍历集群中的所有节点,校验是否有连接中断的节点并进行重新连接
- 如果节点是自身或者是没有地址的节点,跳过
- 如果节点处于握手状态并且已经超时,跳过
- 如果连接为空,调用connConnect进行连接,回调函数为clusterLinkConnectHandler
- 每执行10次clusterCron函数时随机选取五个节点,然后从这五个节点选出最早收到PONG回复的那个节点,也就是找出最久没有进行通信的那个节点,向其发送PING消息,clusterCron每100ms执行一次,执行10次是1000ms,也就是说每1秒选取一个节点调用clusterSendPing函数发送一次PING消息
1 | void clusterCron(void) { |
clusterNode
clusterNode是集群中节点对应的结构体,包含了以下内容:
1 | typedef struct clusterNode { |
clusterLinkConnectHandler
clusterLinkConnectHandler是建立连接的监听函数,当连接建立时会调用clusterLinkConnectHandler进行处理,在clusterLinkConnectHandler函数中可以看到,又调用了connSetReadHandler注册了可读事件的监听,对应的回调函数为clusterReadHandler,当收到其他节点发送的通信消息时会调用clusterReadHandler函数处理:
1 | void clusterLinkConnectHandler(connection *conn) { |
集群间通信
通信消息结构体定义
clusterMsg
集群间通信的消息对应的结构体为clusterMsg,里面包含了消息类型、发送消息节点的slots信息以及节点间通信的消息体clusterMsgData等信息:
1 | typedef struct { |
clusterMsgData
clusterMsgData里面存储了节点间进行通信的实际消息,不同消息类型对应不同的数据结构:
- clusterMsgDataGossip:PING, MEET 和 PONG消息对应的数据结构
- clusterMsgDataFail:FAIL消息对应的数据结构
- clusterMsgDataPublish:PUBLISH消息对应的数据结构
- clusterMsgDataUpdate:UPDATE消息对应的数据结构
- clusterMsgModule:MODULE消息对应的数据结构
1 | union clusterMsgData { |
clusterMsgDataGossip
clusterMsgDataGossip是集群间发送PING、MEET 和 PONG消息对应的数据结构,里面包含以下信息:
1 | typedef struct { |
PING消息的发送
clusterSendPing
clusterSendPing函数用于向指定节点发送PING消息,Ping消息中不仅包含当前节点的信息,也会随机选取一些其他的节点,将其他节点的信息封装在消息体中进行发送,随机选取节点的个数计算规则如下:
wanted
:随机选取的节点个数,默认是集群中节点的数量除以10freshnodes
:随机选取的节点个数的最大值,默认集群中节点的数量减2
如果wanted
小于3,那么将wanted
置为3,也就是最少选取3个节点;
如果wanted
大于freshnodes
,将wanted
置为freshnodes
的值,也就是最大可以选取freshnodes
个节点;
选取的节点个数wanted
确定之后,处理逻辑如下:
调用clusterBuildMessageHdr函数构建消息头
根据
wanted
的数量随机选取节点,处于以下几种情况的节点将被跳过- FAIL下线状态的节点
处于握手状态的节点
没有地址信息的节点
- 失去连接的节点并且没有配置slots.
调用clusterSetGossipEntry函数将选取的节点信息加入到消息体中
调用clusterSendMessage函数发送消息
1 | void clusterSendPing(clusterLink *link, int type) { |
构建消息头
clusterBuildMessageHdr
clusterBuildMessageHdr函数用于构建消息头,设置了消息发送者的节点相关信息:
- 设置了签名、消息类型、节点IP、端口等信息
- 设置发送消息节点的slots信息,如果发送消息的节点是从节点,需要使用它对应的主节点的slots信息
- 计算集群消息的总长度totlen,并设置到消息头中
1 | void clusterBuildMessageHdr(clusterMsg *hdr, int type) { |
构建消息体
clusterSetGossipEntry
clusterSetGossipEntry函数用于构建消息体,将随机选取的其他节点信息加入到ping消息对应的数组hdr->data.ping.gossip[i]中,并设置节点的相关信息:
1 | void clusterSetGossipEntry(clusterMsg *hdr, int i, clusterNode *n) { |
PING消息的处理
clusterReadHandler
由上面的clusterLinkConnectHandler函数可知,收到其他节点发送的通信消息时会调用clusterReadHandler函数处理,在clusterReadHandler函数中会开启while循环,不断读取数据,直到获取完整的数据(收到的数据长度rcvbuflen等于消息中设置数据总长度时),调用clusterProcessPacket函数处理收到的消息:
1 | void clusterReadHandler(connection *conn) { |
clusterProcessPacket
clusterProcessPacket函数用于处理收到的通信消息,可以看到有许多if else分支,根据消息类型的不同,进行了不同的处理,这里先只关注PING消息的处理:
- 如果消息类型是PING或者MEET,调用clusterSendPing函数发送PONG消息,传入的消息类型为CLUSTERMSG_TYPE_PONG,说明PING和PONG消息都是通过clusterSendPing函数实现的,PING和PONG消息的数据结构一致,那么回复的PONG消息中也会带上回复者的节点信息以及回复者随机选取的其他节点信息,以此达到节点间交换信息的目的
- 如果是PING, PONG或者MEET消息,并且sender不为空,不为空表示发送消息的节点是当前节点已知的,调用clusterProcessGossipSection函数处理消息体中的Gossip数据
1 | int clusterProcessPacket(clusterLink *link) { |
clusterProcessGossipSection
clusterProcessGossipSection函数用于处理clusterMsg中的Gossip节点信息g,它从集群消息中获取Gossip节点数据,根据节点数量进行遍历:
调用clusterLookupNode函数根据nodename从当前收到消息的节点的集群中查找Gossip节点,查找结果记为
node
- 如果
node
如果不为空,说明可以从当前节点的集群中找到,Gossip节点针对当前节点是已知的,需要注意node指向的是当前收到消息节点中维护的相同nodename的节点,g指向当前正在遍历的gossip节点(sender发送的消息中携带gossip数组),注意两者的区别 - 如果
node
如果为空,说明Gossip节点针对当前节点是未知的,之前不在当前节点维护的集群节点中
- 如果
如果
node
不为空,也就是当前收到消息这个节点的集群中已经存在node
节点,进行如下处理:(1)发送消息的节点
sender
是主节点时有以下两种情况:- 如果
node
是FAIL或者PFAIL状态,需要将sender
加入到node
节点的下线链表fail_reports中,表示sender认为node节点下线(clusterNodeAddFailureReport函数) - 断是否有必要将
node
标记为下线状态(markNodeAsFailingIfNeeded函数) - 如果
node
不是FAIL或者PFAIL状态,需要校验node
是否已经在sender
的下线节点链表fail_reports中,如果在需要从中移除
(2)如果
node
节点不是FAIL、PFAIL、NOADDR状态,并且node
的ip或者端口与g
指向的gossip节点中的ip或者端口不一致,需要更新node中的ip和端口- 如果
如果
node
为空,说明之前不在当前节点维护的集群节点中,如果gossip节点不处于NOADDR状态并且不在nodes_black_list中,新建节点,加入到当前收到消息的节点维护的集群数据server.cluster中
1 | void clusterProcessGossipSection(clusterMsg *hdr, clusterLink *link) { |
总结
参考
极客时间 - Redis源码剖析与实战(蒋德钧)
zhaiguanjie-Redis源码剖析
Redis版本:redis-6.2.5